package org.feng.config;

import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.feng.common.GsonUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.function.Function;

/**
 * WebClient 配置，参考官方文档，https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-client
 *
 * <br>参考：
 * <br> https://www.freesion.com/article/7381889590/
 * <br> https://blog.csdn.net/weixin_44266223/article/details/122967933
 * <br> https://www.thinbug.com/q/51726943
 *
 * @version V1.0
 * @author: fengjinsong
 * @date: 2022年06月16日 15时41分
 */
@Slf4j
@Configuration
public class WebClientConfig {

    /**
     * 默认情况下，HttpClient参与持有的全局 Reactor Netty 资源 reactor.netty.http.HttpResources，包括事件循环线程和连接池<br>
     * 但是，这里选择不使用全局资源
     *
     * @return 资源工厂
     */
    private ReactorResourceFactory reactorResourceFactory() {
        ReactorResourceFactory factory = new ReactorResourceFactory();
        factory.setUseGlobalResources(false);
        // 设置一个loop进行http线程管理
        factory.setLoopResources(LoopResources.create("tcp-connect-loop", 30, true));
        // 配置固定大小连接池
        factory.setConnectionProvider(connectionProvider());
        return factory;
    }

    private ConnectionProvider connectionProvider() {
        return ConnectionProvider
                .builder("tcp-connect-pool")
                // 等待超时时间
                .pendingAcquireTimeout(Duration.ofSeconds(6))
                // 最大连接数
                .maxConnections(30)
                // 等待队列大小
                .pendingAcquireMaxCount(300)
                .maxIdleTime(Duration.ofSeconds(200))
                .maxLifeTime(Duration.ofSeconds(200))
                .build();
    }

    @Bean
    public WebClient webClient() {
        Function<HttpClient, HttpClient> mapper = client -> {
            // 连接超时时间
            client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                    // 连接后的读、写超时
                    .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(10))
                            .addHandlerLast(new WriteTimeoutHandler(10)))
                    // 设置响应超时时间
                    .responseTimeout(Duration.of(6, ChronoUnit.SECONDS))
            ;
            return client;
        };

        ClientHttpConnector connector = new ReactorClientHttpConnector(reactorResourceFactory(), mapper);

        return WebClient.builder()
                // 编解码器对在内存中缓冲数据大小修改
                .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(4 * 1024 * 1024))
                .filter(logFilter())
                .clientConnector(connector)
                .build();
    }

    /**
     * 过滤请求，输出日志
     *
     * @return 过滤器实例
     */
    private ExchangeFilterFunction logFilter() {
        return (request, next) -> {
            log.info("请求路径： {}", request.url());
            log.info("请求头： {}", request.headers());
            log.info("请求body：{}", GsonUtil.toJson(request.body()));
            log.info("请求属性：{}", request.attributes());
            log.info("请求方式：{}", request.method());
            return next.exchange(request);
        };
    }
}
